import geopandas as gpd
import matplotlib
import matplotlib.pyplot as plt
import pandas as pd
from tobler.util import h3fy
from tobler.area_weighted import area_interpolate
import numpy as np
import warnings
# Read LOR as GeoDataFrame and show
plr = gpd.read_file("data/LOR_SHP_2019/Planungsraum_EPSG_25833.shp")
plr = plr.set_crs('EPSG:25833') # ETRS89 / UTM zone 33N
plr.plot(figsize=(8,8))
<AxesSubplot:>
# Calculate hexgrid
grid = h3fy(plr, resolution=8)
grid.plot(figsize=(8,8))
<AxesSubplot:>
# Read population on LOR level and join with geo data
df_pop = pd.read_csv(
"data/EWR201912E_Matrix.csv",
sep=";",
usecols=["RAUMID", "E_E"],
dtype={"RAUMID": str}
)
plr = plr.join(df_pop.set_index("RAUMID"), on="SCHLUESSEL")
plr.plot("E_E", figsize=(12,12), legend=True)
<AxesSubplot:>
Try to distribute exact population numbers to street block level where only rough categories are published
blocks = gpd.read_file("data/s_rbs_bloecke.json")
# remove uninhabited blocks
blocks.drop(['id','bez','bezname','datum'], axis=1, inplace=True)
blocks.drop(blocks[blocks.ewk == 'unbewohnt'].index, inplace=True)
blocks.info()
print("\n\nInhabitant categories", blocks.ewk.unique())
<class 'geopandas.geodataframe.GeoDataFrame'> Int64Index: 12656 entries, 1 to 15804 Data columns (total 6 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 blknr 12656 non-null object 1 plr 12656 non-null object 2 blktypname 12656 non-null object 3 ewk 12656 non-null object 4 area 12656 non-null object 5 geometry 12656 non-null geometry dtypes: geometry(1), object(5) memory usage: 692.1+ KB Inhabitant categories ['100-999 Einwohner' 'mehr als 1.000 Einwohner' '1-9 Einwohner' '10-99 Einwohner']
#plot LOR level and blocks together
base = plr.plot(figsize=(25,25))
blocks.plot(ax=base, color='red')
<AxesSubplot:>
# Visually check if blocks align with LOR
fig, axes = plt.subplots(nrows=3,ncols=2, figsize=(12,12))
test_keys = ["01011101", "01011102","01011103","01011104", "03040614", "09041403"]
for r in [0,1,2]:
for c in [0,1]:
key = test_keys[2*r + c];
ax = plr[plr.SCHLUESSEL == key].plot(ax=axes[r,c])
ax.set_axis_off()
blocks[blocks.plr==key]\
.plot(ax=ax, color="red",edgecolor="black", alpha=0.5)
# The actual distribution.
# Strategy: For every LOR
# * find all blocks
# * distribute all inhabitants
# * first the minimum on every block: 100-999 ==> 100
# * then the middle of each block : 100-999 ==> 500
# * then the rest equally into the 1000+ blocks
# This is not exact but easy enough
# Inhabitants Map
im = {1000: "mehr als 1.000 Einwohner",
100: "100-999 Einwohner",
10: "10-99 Einwohner",
1: "1-9 Einwohner"}
blocks["num"] = 0.0
def distribute_inhabitants(plr_key):
inhabitants_left = gdf[gdf.SCHLUESSEL== plr_key].E_E.iloc[0]
blks = blocks[blocks.plr==plr_key]
# For all blocks, add minimum of bucket
for b in [1,10,100,1000]:
nb = blks.ewk.value_counts().get(im[b], default=0)
if nb > 0:
blks.loc[blks.ewk == im[b], 'num'] = b
inhabitants_left -= nb * b
# for small buckets, add mid
for b in [1,10,100]:
nb = blks.ewk.value_counts().get(im[b], default=0)
d = b * 4# new to be distributed
# mid of bucket if enough left
if nb > 0 and nb*d < inhabitants_left:
blks.loc[blks.ewk == im[b], 'num'] += d
inhabitants_left -= nb*d
# TODO: if not enough left, iterate through buckets until empty
nb = blks.ewk.value_counts().get(im[1000], default=0)
if nb > 0:
d = np.floor(inhabitants_left / nb)
#print(inhabitants_left, d)
blks.loc[blks.ewk == im[1000], 'num'] += d
inhabitants_left -= nb*d
# last bit to first 1000+ bucket
# TODO: this doesn't work. value is not updated
# blks.loc[blks.ewk == im[1000],'num'].iat[0] += inhabitants_left
# copy data to main df
blocks.loc[blocks.plr==plr_key, 'num'] = blks.num
for key in gdf.SCHLUESSEL:
try:
distribute_inhabitants(key)
except:
print('Problem with', key)
# Check the error
print("distributed", blocks.num.sum())
print("original", gdf.E_E.sum())
print("error",gdf.E_E.sum() - blocks.num.sum())
C:\Users\Paul\anaconda3\envs\geo\lib\site-packages\pandas\core\indexing.py:1763: SettingWithCopyWarning: A value is trying to be set on a copy of a slice from a DataFrame. Try using .loc[row_indexer,col_indexer] = value instead See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy isetter(loc, value) C:\Users\Paul\anaconda3\envs\geo\lib\site-packages\pandas\core\indexing.py:1743: SettingWithCopyWarning: A value is trying to be set on a copy of a slice from a DataFrame. Try using .loc[row_indexer,col_indexer] = value instead See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy isetter(ilocs[0], value)
Problem with None Problem with None distributed 3151995.0 original 3769495.0 error 617500.0
#Visual check on block level
blocks.plot("num", figsize=(20,20), legend=True)
<AxesSubplot:>
Now that we have estimated inhabitants on block level we merge that info with the hex grid.
We call the measure for how inhabited a grid cell is weight
# Default value 0 for weight -> uninhabited
grid["weight"] = 0.0
# For every cell in grid
for i,cell in grid.iterrows():
try:
# find all blocks that intersect
b = blocks[blocks.intersects(cell.geometry)]
# calculate intersection polygon
ib = blocks[blocks.intersects(cell.geometry)].intersection(cell.geometry)
# weight is sum over all intersecting blocks
# share of intersection divided by block size * inhabitants in block
grid.loc[i, "weight"] = (ib.area / b.area * b.num).sum()
except:
print("Problem with", i)
# We need log of weight because very few blocks have very high number of inhabitants
# We need log1p(x) = log(x+1) to avoid calculating log(0)
grid["log_weight"] = np.log1p(grid["weight"])
grid.plot("log_weight", figsize=(20,20), legend=True)
Problem with 881f18b757fffff
<AxesSubplot:>
Now we add the test centers to our grid
For grid cells with a test center, we set travel time to 0
For grid cells in the direct neighborhood, we set travel time to 1
# Using the data provided by https://test-to-go.berlin/
places = pd.read_json("data/test-places.json")
places = gpd.GeoDataFrame(places, geometry=gpd.points_from_xy(places.lng, places.lat))
# Test-To-Go uses GPS coordinates. We need to transform it to the projection we use for our grid
places = places.set_crs('epsg:4326') # GPS
places = places.to_crs('EPSG:25833') # ETRS89 / UTM zone 33N
grid["travel"] = float("inf")
r = grid.iloc[0].geometry.length / 6
# This can probably be written without a loop if you know (geo)pandas better
for i,cell in grid.iterrows():
if places.within(cell.geometry).any():
grid.loc[i,"travel"] = 0.0
for i,cell in grid.iterrows():
if np.isinf(cell.travel):
if ((grid.distance(cell.geometry) < r/2) & (grid.travel < 1.0)).any():
grid.loc[i,"travel"] = 1.0
grid.plot("travel", figsize=(20,20), categorical= True, legend=True)
<AxesSubplot:>
For grid cells where we don't have a test center, we calculate the travel time with public transport to the nearest cell with a center.
# To avoid recalculating the travel times every time, we save/load them to csv
travel_times = pd.read_csv('data/travel_times.csv',
index_col=[0,1], squeeze=True,
header = 1, names=['from','to','seconds'])
travel_times = travel_times.reindex(
index = pd.MultiIndex.from_product([grid.index, grid.index], names= ['from', 'to']),
fill_value = np.NaN
)
travel_times
from to
881f1d4995fffff 881f1d4995fffff NaN
881f188713fffff NaN
881f1d4913fffff NaN
881f18b355fffff NaN
881f1d4e31fffff NaN
..
881f1d4eb9fffff 881f1d4965fffff NaN
881f18b757fffff NaN
881f1d4b1dfffff NaN
881f18b419fffff NaN
881f1d4eb9fffff NaN
Name: seconds, Length: 1827904, dtype: float64
# This is a hack to skip the execution of this cell
# when the travel times were loaded from CSV
class StopExecution(Exception):
def _render_traceback_(self):
pass
if (travel_times.size > 0):
print('Skipping because travel times have been loaded from CSV')
raise StopExecution
import requests
import json
import dateutil
import datetime
import calendar
from ratelimiter import RateLimiter
import sys
# Set departure to next monday
# because APIs don't allow requests too far away from today
departure = (
datetime.date.today() + dateutil.relativedelta.relativedelta(weekday=calendar.MONDAY, hour=9)
).astimezone().isoformat()
departure
num_test_travel = 5
travel_times = pd.Series(
dtype= 'float64',
index=pd.MultiIndex.from_product([grid.index, grid.index], names= ['from', 'to'])
)
# The calculation will take a long time and we need some visualization that it's still going
def printProgressBar(i,max,postText):
# From https://stackoverflow.com/a/58602365/912189
n_bar =10 #size of progress bar
j= i/max
sys.stdout.write('\r')
sys.stdout.write(f"[{'=' * int(n_bar * j):{n_bar}s}] {int(100 * j)}% {postText}")
sys.stdout.flush()
url_base = "https://v5.vbb.transport.rest/journeys"
# Using RateLimiter to stay within the limits the API expects
@RateLimiter(max_calls=100, period=60)
def calc_travel(fr,to):
if not np.isnan(travel_times[fr.name, to.name]):
# already calculated
return
try:
#the API works with GPS coordinates so we have to transform from our coordinate system
#Using the centers of grid cells for travel time calculation
fr_gps = gpd.GeoSeries(fr.geometry.centroid).set_crs('EPSG:25833').to_crs('epsg:4326').iloc[0]
to_gps = gpd.GeoSeries(to.geometry.centroid).set_crs('EPSG:25833').to_crs('epsg:4326').iloc[0]
params = {
"from.longitude": fr_gps.x,
"from.latitude": fr_gps.y,
# API requires an address but doesn't use it if coordinates are present
"from.address": "dummy_address",
"to.longitude": to_gps.x,
"to.latitude": to_gps.y,
"to.address": "dummy_address",
"departure": departure,
"results": 1,
}
res = requests.get(url_base, params=params).json()
dep = dateutil.parser.parse(departure).timestamp()
# Calculating and hoping the JSON is completely there
#
arr = dateutil.parser.parse(res['journeys'][0]['legs'][-1]['plannedArrival']).timestamp()
tt = (arr-dep)
travel_times[fr.name, to.name] = tt
# http://wiki.c2.com/?PokemonExceptionHandling
except:
print('Problem calculating travel time at', fr.name, to.name)
cnt = 0;
total = grid.loc[np.isinf(grid.travel)].size
for _,cFrom in grid.iterrows():
if np.isfinite(cFrom.travel):
continue
# We can't calculate the travel time to all test centers
# So we assume that the geographically closest n=5 grid cells
# are also the fastest by travel time
likely_nearest = grid.loc[
grid.loc[grid.travel == 0].distance(cFrom.geometry)\
.sort_values().head(num_test_travel).index.values
]
for _, cTo in likely_nearest.iterrows():
calc_travel(cFrom, cTo)
cnt += 1
printProgressBar(cnt, total, "\nProgress\n")
travel_times.dropna().to_csv('data/travel_times.csv')
Skipping because travel times have been loaded from CSV
# Merge grid with travel times
min_travel = grid.apply(lambda r: {'travel':travel_times[r.name].min(),
'to':travel_times[r.name].idxmin()},
axis=1, result_type='expand').dropna()
# drop unrealistic values
min_travel = min_travel[min_travel.travel < 8000]
#add travel time to grid
grid.loc[min_travel.index, 'travel'] = min_travel['travel'] / 60
grid.loc[min_travel.index, 'to'] = min_travel['to']
#tag error cases
grid.loc[np.isinf(grid.travel), 'error_case'] = 'NOT_CALCULATED'
grid.loc[grid.weight == 0, 'error_case'] = 'NOT_INHABITED'
Now we combine number of inhabitants with travel time to the nearest test center for some kind of "fairness measure" or a visualization where test centers are missing
# calculate value
grid.loc[grid.error_case.isna() & (grid.travel > 1), 'value'] = np.log1p(grid.travel / grid.log_weight)
grid.loc[grid.travel <= 1, 'value'] = 0
# The plot
cmap = plt.get_cmap('plasma', 10)
ax = grid[grid.error_case.isna()].plot("value", figsize=(20,20), legend=True, cmap='plasma')
places.plot(ax=ax, color='red', markersize=3)
<AxesSubplot:>
# Save result with gps coordinates to geojson for further visualisation
grid.to_crs('epsg:4326').to_file('output/result.json', driver="GeoJSON")